/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.table.client.cli;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.client.gateway.ResultDescriptor;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.table.client.gateway.result.MaterializedResult;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.utils.print.PrintStyle;

import org.jline.keymap.KeyMap;
import org.jline.terminal.Terminal;
import org.jline.utils.AttributedString;
import org.jline.utils.AttributedStringBuilder;
import org.jline.utils.AttributedStyle;
import org.jline.utils.InfoCmp.Capability;

import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.flink.table.client.cli.CliUtils.TIME_FORMATTER;
import static org.apache.flink.table.client.cli.CliUtils.formatTwoLineHelpOptions;
import static org.apache.flink.table.client.cli.CliUtils.normalizeColumn;
import static org.apache.flink.table.client.cli.CliUtils.repeatChar;
import static org.jline.keymap.KeyMap.ctrl;
import static org.jline.keymap.KeyMap.esc;
import static org.jline.keymap.KeyMap.key;

/** CLI view for retrieving and displaying a table. */
public class CliTableResultView extends CliResultView<CliTableResultView.ResultTableOperation> {

    private int pageCount;
    private int page;
    private LocalTime lastRetrieval;
    private int previousResultsPage;
    private final MaterializedResult materializedResult;

    private static final int DEFAULT_REFRESH_INTERVAL = 3; // every 1s
    private static final int MIN_REFRESH_INTERVAL = 1; // every 100ms
    private static final int LAST_PAGE = 0;

    public CliTableResultView(Terminal terminal, ResultDescriptor resultDescriptor) {
        this(terminal, resultDescriptor, resultDescriptor.createResult());
    }

    @VisibleForTesting
    public CliTableResultView(
            Terminal terminal,
            ResultDescriptor resultDescriptor,
            MaterializedResult materializedResult) {
        super(
                terminal,
                resultDescriptor,
                PrintStyle.tableauWithTypeInferredColumnWidths(
                        resultDescriptor.getResultSchema(),
                        resultDescriptor.getRowDataStringConverter(),
                        resultDescriptor.maxColumnWidth(),
                        false,
                        false));

        this.refreshInterval = DEFAULT_REFRESH_INTERVAL;
        this.pageCount = 1;
        this.page = LAST_PAGE;
        this.materializedResult = materializedResult;

        this.previousResults = Collections.emptyList();
        this.previousResultsPage = 1;
        this.results = Collections.emptyList();
    }

    // --------------------------------------------------------------------------------------------

    @Override
    protected String[] getRow(String[] resultRow) {
        return resultRow;
    }

    @Override
    void cleanUpQuery() {
        materializedResult.close();
    }

    @Override
    protected void refresh() {
        // take snapshot
        TypedResult<Integer> result;
        try {
            result = materializedResult.snapshot(getVisibleMainHeight());
        } catch (SqlExecutionException e) {
            close(e);
            return;
        }

        // stop retrieval if job is done
        if (result.getType() == TypedResult.ResultType.EOS) {
            stopRetrieval(false);
        }
        // update page
        else if (result.getType() == TypedResult.ResultType.PAYLOAD) {
            int newPageCount = result.getPayload();
            pageCount = newPageCount;
            if (page > newPageCount) {
                page = LAST_PAGE;
            }
            updatePage();
        }

        lastRetrieval = LocalTime.now();

        // reset view
        resetAllParts();
    }

    @Override
    protected KeyMap<ResultTableOperation> getKeys() {
        final KeyMap<ResultTableOperation> keys = new KeyMap<>();
        keys.setAmbiguousTimeout(200); // make ESC quicker
        keys.bind(ResultTableOperation.QUIT, "q", "Q", esc(), ctrl('c'));
        keys.bind(ResultTableOperation.REFRESH, "r", "R", key(terminal, Capability.key_f5));
        keys.bind(ResultTableOperation.UP, "w", "W", key(terminal, Capability.key_up));
        keys.bind(ResultTableOperation.DOWN, "s", "S", key(terminal, Capability.key_down));
        keys.bind(ResultTableOperation.LEFT, "a", "A", key(terminal, Capability.key_left));
        keys.bind(ResultTableOperation.RIGHT, "d", "D", key(terminal, Capability.key_right));
        keys.bind(ResultTableOperation.OPEN, "o", "O", "\r");
        keys.bind(ResultTableOperation.GOTO, "g", "G");
        keys.bind(ResultTableOperation.NEXT, "n", "N");
        keys.bind(ResultTableOperation.PREV, "p", "P");
        keys.bind(ResultTableOperation.LAST, "l", "L", key(terminal, Capability.key_end));
        keys.bind(ResultTableOperation.INC_REFRESH, "+");
        keys.bind(ResultTableOperation.DEC_REFRESH, "-");
        return keys;
    }

    @Override
    protected void evaluate(ResultTableOperation operation, String binding) {
        switch (operation) {
            case QUIT:
                close();
                break;
            case REFRESH:
                refresh();
                break;
            case UP:
                selectRowUp();
                break;
            case DOWN:
                selectRowDown();
                break;
            case OPEN:
                openRow();
                break;
            case GOTO:
                gotoPage();
                break;
            case NEXT:
                gotoNextPage();
                break;
            case PREV:
                gotoPreviousPage();
                break;
            case LAST:
                gotoLastPage();
                break;
            case LEFT:
                scrollLeft();
                break;
            case RIGHT:
                scrollRight();
                break;
            case INC_REFRESH:
                increaseRefreshInterval();
                break;
            case DEC_REFRESH:
                decreaseRefreshInterval(MIN_REFRESH_INTERVAL);
                break;
        }
    }

    @Override
    protected String getTitle() {
        return CliStrings.RESULT_TITLE + " (" + CliStrings.RESULT_TABLE + ")";
    }

    @Override
    protected List<AttributedString> computeHeaderLines() {
        final AttributedStringBuilder statusLine = new AttributedStringBuilder();
        statusLine.style(AttributedStyle.INVERSE);
        // left
        final String left;
        if (isRetrieving()) {
            left =
                    CliStrings.DEFAULT_MARGIN
                            + CliStrings.RESULT_REFRESH_INTERVAL
                            + ' '
                            + REFRESH_INTERVALS.get(refreshInterval).f0;
        } else {
            left = CliStrings.DEFAULT_MARGIN + CliStrings.RESULT_STOPPED;
        }
        // middle
        final StringBuilder middleBuilder = new StringBuilder();
        middleBuilder.append(CliStrings.RESULT_PAGE);
        middleBuilder.append(' ');
        if (page == LAST_PAGE) {
            middleBuilder.append(CliStrings.RESULT_LAST_PAGE);
        } else {
            middleBuilder.append(page);
        }
        middleBuilder.append(CliStrings.RESULT_PAGE_OF);
        middleBuilder.append(pageCount);
        final String middle = middleBuilder.toString();
        // right
        final String right;
        if (lastRetrieval == null) {
            right =
                    CliStrings.RESULT_LAST_REFRESH
                            + ' '
                            + CliStrings.RESULT_REFRESH_UNKNOWN
                            + CliStrings.DEFAULT_MARGIN;
        } else {
            right =
                    CliStrings.RESULT_LAST_REFRESH
                            + ' '
                            + lastRetrieval.format(TIME_FORMATTER)
                            + CliStrings.DEFAULT_MARGIN;
        }
        // all together
        final int totalLeftSpace = getWidth() - middle.length();
        final int leftSpace = totalLeftSpace / 2 - left.length();
        statusLine.append(left);
        repeatChar(statusLine, ' ', leftSpace);
        statusLine.append(middle);
        final int rightSpacing = getWidth() - statusLine.length() - right.length();
        repeatChar(statusLine, ' ', rightSpacing);
        statusLine.append(right);

        return Arrays.asList(statusLine.toAttributedString(), AttributedString.EMPTY);
    }

    @Override
    protected List<AttributedString> computeMainHeaderLines() {
        final AttributedStringBuilder schemaHeader = new AttributedStringBuilder();

        IntStream.range(0, resultDescriptor.getResultSchema().getColumnCount())
                .forEach(
                        idx -> {
                            schemaHeader.style(AttributedStyle.DEFAULT);
                            schemaHeader.append(' ');
                            String columnName =
                                    resultDescriptor.getResultSchema().getColumnNames().get(idx);
                            schemaHeader.style(AttributedStyle.DEFAULT.underline());
                            normalizeColumn(schemaHeader, columnName, columnWidths[idx]);
                        });

        return Collections.singletonList(schemaHeader.toAttributedString());
    }

    @Override
    protected List<AttributedString> computeFooterLines() {
        return formatTwoLineHelpOptions(getWidth(), getHelpOptions());
    }

    // --------------------------------------------------------------------------------------------

    private void updatePage() {
        // retrieve page
        final int retrievalPage = page == LAST_PAGE ? pageCount : page;
        final List<RowData> rows;
        try {
            rows = materializedResult.retrievePage(retrievalPage);
        } catch (SqlExecutionException e) {
            close(e);
            return;
        }

        // convert page
        final List<String[]> stringRows =
                rows.stream()
                        .map(resultDescriptor.getRowDataStringConverter()::convert)
                        .collect(Collectors.toList());

        // update results
        if (previousResultsPage == retrievalPage) {
            // only use the previous results if the current page number has not changed
            // this allows for updated results when the key space remains constant
            previousResults = results;
        } else {
            previousResults = null;
            previousResultsPage = retrievalPage;
        }

        results = stringRows;

        // check if selected row is still valid
        if (selectedRow != NO_ROW_SELECTED) {
            if (selectedRow >= results.size()) {
                selectedRow = NO_ROW_SELECTED;
            }
        }

        // reset view
        resetAllParts();
    }

    private List<Tuple2<String, String>> getHelpOptions() {
        final List<Tuple2<String, String>> options = new ArrayList<>();

        options.add(Tuple2.of("Q", CliStrings.RESULT_QUIT));
        options.add(Tuple2.of("R", CliStrings.RESULT_REFRESH));

        options.add(Tuple2.of("+", CliStrings.RESULT_INC_REFRESH));
        options.add(Tuple2.of("-", CliStrings.RESULT_DEC_REFRESH));

        options.add(Tuple2.of("G", CliStrings.RESULT_GOTO));
        options.add(Tuple2.of("L", CliStrings.RESULT_LAST));

        options.add(Tuple2.of("N", CliStrings.RESULT_NEXT));
        options.add(Tuple2.of("P", CliStrings.RESULT_PREV));

        options.add(Tuple2.of("O", CliStrings.RESULT_OPEN));

        return options;
    }

    private void gotoPage() {
        final CliInputView view =
                new CliInputView(
                        terminal,
                        CliStrings.INPUT_ENTER_PAGE + " [1 to " + pageCount + "]",
                        (s) -> {
                            // validate input
                            final int newPage;
                            try {
                                newPage = Integer.parseInt(s);
                            } catch (NumberFormatException e) {
                                return false;
                            }
                            return newPage > 0 && newPage <= pageCount;
                        });
        view.open(); // enter view
        if (view.getResult() != null) {
            page = Integer.parseInt(view.getResult());
            updatePage();
        }
    }

    private void gotoNextPage() {
        final int curPageIndex = page == LAST_PAGE ? pageCount : page;
        if (curPageIndex < pageCount) {
            page = curPageIndex + 1;
        }
        updatePage();
    }

    private void gotoPreviousPage() {
        final int curPageIndex = page == LAST_PAGE ? pageCount : page;
        if (curPageIndex > 1) {
            page = curPageIndex - 1;
        }
        updatePage();
    }

    private void gotoLastPage() {
        page = LAST_PAGE;
        updatePage();
    }

    // --------------------------------------------------------------------------------------------

    /** Available operations for this view. */
    public enum ResultTableOperation {
        QUIT, // leave view
        REFRESH, // refresh current table page
        UP, // row selection up
        DOWN, // row selection down
        OPEN, // shows a full row
        GOTO, // enter table page number
        NEXT, // next table page
        PREV, // previous table page
        LAST, // last table page
        LEFT, // scroll left if row is large
        RIGHT, // scroll right if row is large
        INC_REFRESH, // increase refresh rate
        DEC_REFRESH, // decrease refresh rate
    }
}
